package com.stormdemo;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

/**
 * 组织各个处理组件形成一个完整的处理流程，就是所谓的topology(类似于mapreduce程序中的job)
 * 并且将该topology提交给storm集群去运行，topology提交到集群后就将永无休止地运行，除非人为或者异常退出
 * @author duanhaitao@itcast.cn
 * 运行：./storm jar demotop.jar com.stormdemo.TopoMain
 */
public class TopoMain {


	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();

		//将我们的spout组件设置到topology中去
		//parallelism_hint ：4  表示用4个excutor来执行这个组件
		//setNumTasks(8) 设置的是该组件执行时的并发task数量，也就意味着1个excutor会运行2个task
		builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);

		//将大写转换bolt组件设置到topology，并且指定它接收randomspout组件的消息
		//.shuffleGrouping("randomspout")包含两层含义：
		//1、upperbolt组件接收的tuple消息一定来自于randomspout组件
		//2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping
		builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");

		//将添加后缀的bolt组件设置到topology，并且指定它接收upperbolt组件的消息
		builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");

		//用builder来创建一个topology
		StormTopology demotop = builder.createTopology();


		//配置一些topology在集群中运行时的参数
		Config conf = new Config();
		//这里设置的是整个demotop所占用的槽位数，也就是worker的数量
		conf.setNumWorkers(4);
		conf.setDebug(true);
		conf.setNumAckers(0);


		//将这个topology提交给storm集群运行
		StormSubmitter.submitTopology("demotopo", conf, demotop);

	}
}
